package com.chenjl.trace.transport.extflume;

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
/**
 * 自定义Flume的Sink，从Channel获取Event写入redis
 * 		Event是flume传输的最小对象，从source获取数据后会先封装成event，将event发送到channel，sink从channel拿event消费；
 * 		event由头headers和body两部分组成，
 * 
 * 		agent.sinks.sink.type=com.chenjl.flume.extapi.RedisSink
 * 		agent.sinks.sink.redis_host=dev.cjl.com
 * 		agent.sinks.sink.redis_port=6379
 * 		agent.sinks.sink.redis_list_key=flume_event_list_key
 * 
 * 		src/redis-cli
 * 
 * 		rpop flume_event_list_key   #出队最早的元素
 * 		llen flume_event_list_key     #list长度
 * 
 * 2018-8-20 17:19:05
 * @author chenjinlong
 */
public class RedisSink extends AbstractSink implements Configurable {
	private final Logger log = LoggerFactory.getLogger(this.getClass());
	
	private String redisHost = null;
	private Integer redisPort = null;
	private String redisListKey = null;
	private JedisPool jedisPool = null;
	
	
	@Override
	public void configure(Context context) {
		this.redisHost = context.getString("redis_host");
		this.redisPort = context.getInteger("redis_port");
		this.redisListKey = context.getString("redis_list_key");
		
		log.info("RedisSink读取配置-->>---redisHost: {},redisPort : {}, redisListKey : {}",redisHost,redisPort,redisListKey);
		
		this.jedisPool = new JedisPool(redisHost,redisPort);
	}
	@Override
	public synchronized void start() {
		super.start();
		
		log.info("RedisSink启动-->>---");
	}
	@Override
	public synchronized void stop() {
		super.stop();
		
		log.info("RedisSink关闭-->>---");
		this.jedisPool.close();
	}
	@Override
	public Status process() throws EventDeliveryException {
		Status status = Status.READY;
        Channel channel = super.getChannel();
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        
        try {	
        	Event event = channel.take();
        	if(event != null) {
        		String content = new String(event.getBody(),"UTF-8");
        		
        		Jedis jedis = jedisPool.getResource();
        		//jedis.rpop(key) 最早入队出队
        		jedis.lpush(redisListKey,content);
        		jedis.close();
        	}
        	else {
        		status = Status.BACKOFF;
        	}
        	
        	//事务开启后必须 commit or rollback
        	transaction.commit();
        }
        catch(Exception e) {
        	log.error("RedisSink process-->>---,出现未知异常 :{}",e);
            transaction.rollback();
            status = Status.BACKOFF;
        }
        finally {
            transaction.close();
        }
        
        log.info("RedisSink process-->>---, 状态 : {}",status);
		return status;
	}
}